package cn.tannn.tregistry.client;

import cn.tannn.tregistry.core.api.Server;
import cn.tannn.tregistry.core.subscribe.MessageEvent;
import com.alibaba.fastjson2.JSON;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static cn.tannn.tregistry.core.http.HttpInvoker.log;
import static java.lang.Thread.sleep;

/**
 * 服务订阅
 *
 * @author tan
 */
public class SubEventSourceListener extends EventSourceListener {
    private static final Logger logger = LoggerFactory.getLogger(ServerClient.class);
    Runnable changeListener;
    ServerClient client;

    /**
     * 存储集记录的版本
     */
    Map<String, Long> VERSIONS = new HashMap<>();


    public SubEventSourceListener(Runnable changeListener, ServerClient client) {
        this.changeListener = changeListener;
        this.client = client;
    }

    @Override
    public void onOpen(EventSource eventSource, Response response) {
        logger.info(" subscribe sse连接...");
    }

    @Override
    public void onEvent(EventSource eventSource, String id, String type, String data) {
        if ("keep alive".equals(data)) {
            return;
        }
        logger.debug(" ===> client subscribe listener：{}", data);
        MessageEvent messageEvent = JSON.to(MessageEvent.class, data);
        if (messageEvent == null) {
            return;
        }
        Long version = VERSIONS.getOrDefault(messageEvent.getService(), -1L);
        Long newVersion = messageEvent.getVersion();
        logger.debug(" ===> client subscribe listener version：{} , newVersion {} ", version, newVersion);
        //  version < newverson(注册中心里的的版本) 才通知
        if (newVersion != null && newVersion > version) {
            // 客户端重载服务实例
            changeListener.run();
            // 更新版本
            VERSIONS.put(messageEvent.getService(), newVersion);
        }

    }

    @Override
    public void onClosed(EventSource eventSource) {
        logger.info(" ====>  subscribe sse closed ... ");
        relaunch();
    }

    @Override
    public void onFailure(EventSource eventSource, Throwable t, Response response) {
        relaunch();
        if (Objects.isNull(response)) {
            logger.error("subscribe  sse连接异常:{}", t);
            eventSource.cancel();
            return;
        }
        ResponseBody body = response.body();
        if (Objects.nonNull(body)) {
            logger.error("subscribe  sse连接异常data：{}，异常：{}", body, t);
        } else {
            logger.error("subscribe  sse连接异常data：{}，异常：{}", response, t);
        }
        eventSource.cancel();
    }

    public void relaunch() {
        try {
            // 延迟一下
            sleep(100);
        } catch (Exception e) {
            // ignore exception
        }
        // 服务不为空，重新 init 保证 subscribe 时 访问的 http有效
        if (!client.servers.isEmpty()) {
            List<String> connectString = client.servers.stream()
                    .filter(Server::isStatus)
                    // 当前leader下线了用从节点去 init
                    .filter(leader -> !leader.isLeader())
                    .map(url -> url.getUrl().replace("http://", "")).toList();
            log.debug(" ====>>>> relaunch subscribe init serverClient connectString : {} ", connectString);
            client.init(connectString);
        }

        // 重新发起订阅
        client.RENEWS.keySet().forEach(
                instance -> {
                    logger.info(" ====>  relaunch subscribe {}... ", VERSIONS);
                    for (String service : client.RENEWS.get(instance)) {
                        client.subscribe(service, changeListener);
                    }

                }
        );
    }
}
